VNODE ring Итак рафтер мы подключили, что делать дальше. Дальше нужно его заставить работать на нескольких машинах, для этого просто запускаем рафтер с именем ноды на самой ноде. И все вызовы call(Node,Msg) которые были заменяем на call({Node,Node},Msg). Рафтер не перестал работать но теперь его стало сложнее тестировать, но тестам на таком уровне мы доверяем больше чем просто пускать на одной ноде в песочнице. Теперь нужно построить хеш кольцо. Для этого я взял библиотеку из riak_core. Она довольно понятна и предсказуема. Вот например у нас генерация кластера из 3 нод, на каждой из которых запущено 4 виртуальных ноды. > cr:ring(). {12, [{0,0}, {121791803110908576516973736059690251637994378581,1}, {243583606221817153033947472119380503275988757162,1}, {365375409332725729550921208179070754913983135743,1}, {487167212443634306067894944238761006551977514324,1}, {608959015554542882584868680298451258189971892905,2}, {730750818665451459101842416358141509827966271486,2}, {852542621776360035618816152417831761465960650067,2}, {974334424887268612135789888477522013103955028648,2}, {1096126227998177188652763624537212264741949407229,3}, {1217918031109085765169737360596902516379943785810,3}, {1339709834219994341686711096656592768017938164391,3}, {1461501637330902918203684832716283019655932542972,3}]} Хеш функция cr:hash(Object) выдает для каждого объекта результат в виде одной из строки из этого списка, например cr:hash(100) == {608959015554542882584868680298451258189971892905,2}. Если вам нужно узнать последовательность реплик для определенно ключа вы используете функцию cr:rep(100). > cr:rep(100). [2,3,1] Чтобы peer по этому номеру можно вот так cr:peer(2) == 'cr2@127.0.0.1', это число которое указано вторым в конфигурации application:get_env(cr,peers) == [{'cr@127.0.0.1',9000,9001,9002}, {'cr2@127.0.0.1',9004,9005,9006}, {'cr3@127.0.0.1',9008,9009,9010}]. Таким образом мы знаем последовательность реплик для каждого объекта из пространства ключей которое равно пространству vnode в системе. Любая vnode супервайзятся только на одном из пиров. Транзакции супервайзятся отдельно. У транзакшин менеджера в API logging backend состоит ищ трех операций: prepare, commit, rollback. Можно выполнять только односторонний вариант протокола без prepare, только commit и rollback(error). Эти команды можно было бы и выполнять в контексте рафтера, но у рафтера свой бинарный транзакшин лог. Формат его таков: RAFTER gen_fsm File Header Format ----------------------- <> Entry Format ---------------- <> Trailer Format ---------------- <> Главное чтобы в логе каждое сообщение было подписано, как и годится для любого приличного заведения от bit corruption как минимумум, также необходим трейлер чтобы узнать записано ли было предыдущее сообщение в лоне до конца. В riak_ensemble есть свое хранилище которое называется synctree, я же оставил оригинальный рафтеровский log. Вот сессия которая создает две записи в log: > cr_rafter:op({op,{new,table}}). > RAFTER APPEND Me: 'cr@127.0.0.1' Entry {rafter_entry,op,497,undefined, {new,table}} RAFTER LEADER op CONS NEW: {table} > cr_rafter:op({op,{put,table,1,#table{name=log}}}). RAFTER APPEND Me: 'cr@127.0.0.1' Entry {rafter_entry,op,497,undefined, {put,table,1, {table,log,undefined,[],[], disc_copies}}} RAFTER LEADER op CONS PUT: {table,108,{table,log,undefined,[],[],disc_copies}} {ok,#table{name = log,container = undefined,fields = [], keys = [],copy_type = disc_copies}} > ets:tab2list(table). [{108,{table,log,undefined,[],[],disc_copies}}] > cr:show(). [{84, #rafter_entry{type = op,term = 497,index = 84, cmd = {new,table}}}, {85, #rafter_entry{type = op,term = 497,index = 85, cmd = {put,table,1, #table{name = log,container = undefined,fields = [], keys = [],copy_type = disc_copies}}}}]. Так вот нам для chain replication нужен жеж свой лог, который будет хранить kvs операции, и сам будет храниться в kvs используя рекорды #log и #operation как базовый лог для каждой ноды chain replication сети. Делать это в отдельном логег мне кажется разумным, так как координатор этого лога не рафтер, а будущий chain replication node manager. Вот еще картинок нарисовал https://synrc.com/apps/cr/doc/cr.htm, надо будет это все в сюда в итоге подбить. Тут просто мысли оставляю, не судите строго. Я еще не начал ничего писать, только компоненты собираю, а набралось уже 2000 строк кода. Это все OTP gen_server и gen_fsm сжирают, завставляют что-то слишком много всего писать. Но ниче OTP так OTP, сделал вот такие виды сущностей в CR, это типа у нас новая генерация TPS, котрый был построен на riak_core и поддерживал data locality processing, где все запросы хендлились главным сервером, отвечающим за ключ, а если нет, то следующим. Это вместе с системными TCP листенерами, эрланговский childspec DSL: tcp(Name,Port,Mod,Nodes) -> {Name,{cr_tcp,start_link, [Name,Port,Mod,Nodes]}, permanent,2000,worker,[cr_tcp]}. pool(SupName) -> {SupName,{supervisor,start_link, [{local,SupName},cr_connection,[]]}, permanent,infinity,supervisor,[]}. vnode({I,N},Nodes) -> {{I,N},{cr_vnode,start_link, [{I,N},Nodes]}, permanent,2000,worker,[cr_vnode]}. xa(Id) -> {Id,{cr_xa,start_link, [Id]}, permanent,infinity,worker,[cr_xa]}. log({I,N},Nodes) -> {cr_log:logname(N),{cr_log,start_link, [N,#rafter_opts{cluster=Nodes}]}, permanent,2000,worker,[cr_log]}. rafter({I,N},Nodes) -> {N,{cr_rafter,start_link, [{I,N},#rafter_opts{state_machine=cr_replication,cluster=Nodes}]}, permanent,2000,worker,[cr_rafter]}. Бутстреп системы таким образом помещается в три функции: init([Nodes,Opts]) -> {ok, {{one_for_one, 5, 60}, lists:flatten([ protocol(O,Nodes) || O<-Opts ] ++ [ pool(vnode_sup) ] ++ [ log({0,N},Nodes) || {N,_,_,_} <- Nodes, N == node()] ++ [ rafter({0,N},Nodes) || {N,_,_,_} <- Nodes, N == node()]) }}. protocol({Name,Port,Mod},Nodes) -> SupName = list_to_atom(lists:concat([Name,'_',sup])), [ tcp(Name,Port,Mod,Nodes), % TCP listener gen_server pool(SupName) ]. % Accepted Clients Supervisor start_vnode({0,_Name},_Nodes) -> skip; start_vnode({Index,Name},Nodes) -> supervisor:start_child(vnode_sup,vnode({Index,Name},Nodes)).